快速搞定线程池源码
关注“Java后端技术全栈”
回复“面试”获取最新资料
回复“加群”邀您进技术交流群
何为线程池?
线程池的优点:降低了线程频繁创建、销毁的开销,提高系统的响应速度,方便统一管理创建的线程。
java.util.concurrent.ThreadPoolExecutor
线程池(ThreadPoolExecutor)提供 4 个默认的构造方法,固定参数 5 个。如下图:
核心参数解释如下:
核心线程数量:corePoolSize
最大线程数量:maximumPoolSize
非核心线程空闲等待时间:keepAliveTime
等待时间单位:timeUnit
等待阻塞队列:blockingQueue<Runnable>
(可选)线程工厂创建线程:threadFactory
(可选)线程池拒绝策略:rejectedExecutionHandler
线程池原理
corePoolSize
当 allowCoreThreadTimeout= true
时,核心线程没有任务且存活时间超过空闲等待时间后终止。
maximumPoolSize
当 currentThreadNumber >= corePoolSize
,且任务队列已满时,线程池会创建新线程来处理任务;当 currentThreadNumber =maxPoolSize
,且任务队列已满时,线程池会拒绝处理任务而抛出异常。
keepAliveTime
blockingQueue:线程池缓存队列存放待处理的线程任务。
ArrayBlockingQueue:指定大小的等待队列(FIFO);有界队列,创建时需要指定队列的大小。
LinkedBlockingQueue:基于链表的等待队列(FIFO);无界队列,创建时不指定队列大小,默认为 Integer.MAX_VALUE。
PriorityBlockingQueue:带有优先级的等待队列
SynchronizedQueue:不存放对象的等待队列;同步移交队列,直接新建一个线程来执行新来的任务。
threadFactory
rejectedExecutionHandler
AbortPolicy(默认策略):丢弃任务并抛出 RejectedExecutionException。
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务(舍弃最老的请求,即将队列头部任务舍弃)。
DiscardPolicy:不做任何处理,直接丢弃任务。
怎么创建线程池?
线程池的创建主要有 2 种方式
基于 ThreadPoolExecutor 的构造方法创建
Executors 执行器创建
阿里开发规范中是建议使用ThreadPoolExecutor 来创建线程池。
// ThreadPoolExecutor 最基础的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 核心线程、最大线程数量必须大于 0,且 最大线程数量 大于等于 核心线程数量,空闲等待时间大于 0
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
常见线程池有以下 4 种:
newFixedThreadPool:固定大小的线程池
newSingleThreadExecutor:单个线程线程池
newCachedThreadPool:缓存线程池
newScheduledThreadPool:调度线程池
newFixedThreadPool
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
// 固定大小是指: 核心线程数量 = 最大线程数量
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
直接使用线程池构造方法创建固定线程池,线程池中有且仅有固定数量的线程去执行待执行的任务。当“待执行的任务数量 > maximumPoolSize + blockingQueue.size()”时,会抛出异常 java.util.concurrent.RejectedExecutionException。
// 阻塞队列大小为 5
LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(5);
// 创建线程数量为 3 的固定大小线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, blockingQueue);
// 待提交执行任务数量为 10,允许执行任务的数量为 8
for (int i = 0; i< 10; i++) {
poolExecutor.submit(
() -> {
System.out.println(Thread.currentThread().getName());
});
}
newSingleThreadExecutor
ExecutorService threadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
// 核心线程数、最大线程数都为 1,阻塞队列大小为 Integer.MAX_VALUE
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
ExecutorService threadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
// 线程池核心线程数为 0,最大为 Integer.MAX_VALUE,空闲等待时间未 60s
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newScheduledThreadPool
ExecutorService threadPool = Executors.newScheduledThreadPool(3);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
// 设置核心线程数量,默认最大线程数量为 Integer.MAX_VALUE
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
// ThreadPoolExecutor 父类构造方法
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
scheduleAtFixedRate:固定频率执行任务,即任务执行的频率保持不变。
// command:需要执行的任务;initialDelay:初始执行延迟时间;period:后续任务执行延迟时间;unit:延迟时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务执行延迟 1s【周期性的操作,period = 0,所有的任务都将在初始延迟后执行,没有周期性可言】
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleAtFixedRate(
()-> {
try {
// 假设任务执行时间为 3s,当 workTime <= period 时,延迟 period 时间后执行;当 workTime > period ,延迟 workTime 时间后执行
Thread.sleep(3000);
System.out.println("scheduleAtFixedRate " + Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format(new Date()) );
} catch (InterruptedException e) {
e.printStackTrace();
}
},
3,
1,
TimeUnit.SECONDS);
scheduleWithFixedDelay:固定的延时执行任务,指上一次执行成功之后和下一次开始执行的之前的时间。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务将在上一个任务执行成功 1 是后执行;
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleWithFixedDelay(
()-> {
try {
Thread.sleep(1000);
System.out.println("scheduleWithFixedDelay " + Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format(new Date()) );
} catch (InterruptedException e) {
e.printStackTrace();
}
},
3,
1,
TimeUnit.SECONDS);
线程池有哪些状态
RUNNING: 线程池的初始化状态,可以添加待执行的任务。
SHUTDOWN:线程池处于待关闭状态,不接收新任务仅处理已经接收的任务。
STOP:线程池立即关闭,不接收新的任务,放弃缓存队列中的任务并且中断正在处理的任务。
TIDYING:线程池自主整理状态,调用 terminated() 方法进行线程池整理。
TERMINATED:线程池终止状态。
判断线程池状态常用方法总结如下。
shutdown 和 shutdownNow 的联系和区别:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,不接收新的任务。
shutdownNow():立即终止线程池,尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
isShutdown、isTerminated、isTerminating 的区别:
isShutDown():调用 shutdown() 或 shutdownNow() 方法后返回 true。
isTerminated():线程池中不存在任何需要执行的任务时,返回 true。
isTerminating() 线程池调用 shutdown() 或 shutdownNow() 后但是没有完全终止返回 true。
常用方法
线程池任务执行 execute()和 submit() 方法。
execute():ThreadPoolExecutor 类实现 Executor 接口中的方法,无返回值。
submit():ExecutorService 接口中的方法,有返回值,可以利用这一特性对任务的 Future.get() 抛出异常进行处理。
Executor 框架成员及关系图如下:
线程池监控
核心线程数量:
// 线程池维持线程的最小存活数量与 allowCoreThreadTimeOut 参数有关
public int getCorePoolSize() {
return corePoolSize;
}
线程池最大的线程数量:
public int getMaximumPoolSize() {
return maximumPoolSize;
}
线程池最多创建的线程数量:
public int getLargestPoolSize() {
// 获取主线程锁,获取调用此方法时,线程池中曾创建的最大的线程数量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
// 添加待执行任务方法
private boolean addWorker(Runnable firstTask, boolean core){
...
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加待执行的任务
workers.add(w);
// 获取线程池中存在的工作线程的大小
int s = workers.size();
if (s > largestPoolSize)
// 赋值
largestPoolSize = s;
workerAdded = true;
}
...
}
线程池当前存在的线程数量:
public int getPoolSize() {
// 获取主线程锁,获取调用此方法时,线程池中存在线程数量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
} finally {
mainLock.unlock();
}
}
// 最近运行状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
线程池已完成任务数量:
// Worker 类属性获取线程任务计数器
volatile long completedTasks;
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
// 遍历线程池中的线程
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
线程池存在的任务总量:
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
// w.islocked() 获取当前任务的状态,调用 isHeldExclusively() 判断
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
// Work 内部类
public void lock() { acquire(1); } // 加锁
public boolean tryLock() { return tryAcquire(1); }// 尝试获取锁
public void unlock() { release(1); }// 释放锁
public boolean isLocked() { return isHeldExclusively(); }
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
好了,以上便是今天对线程池源码相关的分享。
码字不易。老田期待你来个:点赞...转发...
推荐阅读
目前10000+ 人已关注我们